// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab

#include "rgw_notify.h"
#include "cls/2pc_queue/cls_2pc_queue_client.h"
#include "cls/lock/cls_lock_client.h"
#include <memory>
#include <boost/algorithm/hex.hpp>
#include <boost/context/protected_fixedsize_stack.hpp>
#include <spawn/spawn.hpp>
#include "rgw_sal_rados.h"
#include "rgw_pubsub.h"
#include "rgw_pubsub_push.h"
#include "rgw_perf_counters.h"
#include "common/dout.h"
#include <chrono>

#define dout_subsys ceph_subsys_rgw

namespace rgw::notify {

struct event_entry_t {
  rgw_pubsub_s3_event event;
  std::string push_endpoint;
  std::string push_endpoint_args;
  std::string arn_topic;
  
  void encode(bufferlist& bl) const {
    ENCODE_START(1, 1, bl);
    encode(event, bl);
    encode(push_endpoint, bl);
    encode(push_endpoint_args, bl);
    encode(arn_topic, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
    DECODE_START(1, bl);
    decode(event, bl);
    decode(push_endpoint, bl);
    decode(push_endpoint_args, bl);
    decode(arn_topic, bl);
    DECODE_FINISH(bl);
  }
};
WRITE_CLASS_ENCODER(event_entry_t)

using queues_t = std::set<std::string>;

// use mmap/mprotect to allocate 128k coroutine stacks
auto make_stack_allocator() {
  return boost::context::protected_fixedsize_stack{128*1024};
}

class Manager : public DoutPrefixProvider {
  const size_t max_queue_size;
  const uint32_t queues_update_period_ms;
  const uint32_t queues_update_retry_ms;
  const uint32_t queue_idle_sleep_us;
  const utime_t failover_time;
  CephContext* const cct;
  librados::IoCtx& rados_ioctx;
  static constexpr auto COOKIE_LEN = 16;
  const std::string lock_cookie;
  boost::asio::io_context io_context;
  boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard;
  const uint32_t worker_count;
  std::vector<std::thread> workers;
  const uint32_t stale_reservations_period_s;
  const uint32_t reservations_cleanup_period_s;
 
  const std::string Q_LIST_OBJECT_NAME = "queues_list_object";

  CephContext *get_cct() const override { return cct; }
  unsigned get_subsys() const override { return dout_subsys; }
  std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw notify: "; }

  // read the list of queues from the queue list object
  int read_queue_list(queues_t& queues, optional_yield y) {
    constexpr auto max_chunk = 1024U;
    std::string start_after;
    bool more = true;
    int rval;
    while (more) {
      librados::ObjectReadOperation op;
      queues_t queues_chunk;
      op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
      const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
      if (ret == -ENOENT) {
        // queue list object was not created - nothing to do
        return 0;
      }
      if (ret < 0) {
        // TODO: do we need to check on rval as well as ret?
        ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret << dendl;
        return ret;
      }
      queues.merge(queues_chunk);
    }
    return 0;
  }

  // set m1 to be the minimum between m1 and m2
  static int set_min_marker(std::string& m1, const std::string m2) {
    cls_queue_marker mr1;
    cls_queue_marker mr2;
    if (mr1.from_str(m1.c_str()) < 0 || mr2.from_str(m2.c_str()) < 0) {
      return -EINVAL;
    }
    if (mr2.gen <= mr1.gen && mr2.offset < mr1.offset) {
      m1 = m2;
    }
    return 0;
  }

  using Clock = ceph::coarse_mono_clock;
  using Executor = boost::asio::io_context::executor_type;
  using Timer = boost::asio::basic_waitable_timer<Clock,
        boost::asio::wait_traits<Clock>, Executor>;

  class tokens_waiter {
    const std::chrono::hours infinite_duration;
    size_t pending_tokens;
    Timer timer;
 
    struct token {
      tokens_waiter& waiter;
      token(tokens_waiter& _waiter) : waiter(_waiter) {
        ++waiter.pending_tokens;
      }
      
      ~token() {
        --waiter.pending_tokens;
        if (waiter.pending_tokens == 0) {
          waiter.timer.cancel();
        }   
      }   
    };
  
  public:

    tokens_waiter(boost::asio::io_context& io_context) :
      infinite_duration(1000),
      pending_tokens(0),
      timer(io_context) {}  
 
    void async_wait(yield_context yield) {
      if (pending_tokens == 0) {
        return;
      }
      timer.expires_from_now(infinite_duration);
      boost::system::error_code ec; 
      timer.async_wait(yield[ec]);
      ceph_assert(ec == boost::system::errc::operation_canceled);
    }   
 
    token make_token() {    
      return token(*this);
    }   
  };

  // processing of a specific entry
  // return whether processing was successfull (true) or not (false)
  bool process_entry(const cls_queue_entry& entry, yield_context yield) {
    event_entry_t event_entry;
    auto iter = entry.data.cbegin();
    try {
      decode(event_entry, iter);
    } catch (buffer::error& err) {
      ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
      return false;
    }
    try {
      // TODO move endpoint creation to queue level
      const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
          RGWHTTPArgs(event_entry.push_endpoint_args, this), 
          cct);
      ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
        " for entry: " << entry.marker << dendl;
      const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
      if (ret < 0) {
        ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
          << " failed. error: " << ret << " (will retry)" << dendl;
        return false;
      } else {
        ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
          << " ok" <<  dendl;
        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
        return true;
      }
    } catch (const RGWPubSubEndpoint::configuration_error& e) {
      ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " 
          << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
      return false;
    }
  }

  // clean stale reservation from queue
  void cleanup_queue(const std::string& queue_name, yield_context yield) {
    while (true) {
      ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
      const auto now = ceph::coarse_real_time::clock::now();
      const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
      librados::ObjectWriteOperation op;
      op.assert_exists();
      rados::cls::lock::assert_locked(&op, queue_name+"_lock", 
        ClsLockType::EXCLUSIVE,
        lock_cookie, 
        "" /*no tag*/);
      cls_2pc_queue_expire_reservations(op, stale_time);
      // check ownership and do reservation cleanup in one batch
      auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
      if (ret == -ENOENT) {
        // queue was deleted
        ldpp_dout(this, 5) << "INFO: queue: " 
          << queue_name << ". was removed. cleanup will stop" << dendl;
        return;
      }
      if (ret == -EBUSY) {
        ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
        return;
      }
      if (ret < 0) {
        ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
          << ". error: " << ret << dendl;
      }
      Timer timer(io_context);
      timer.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s));
      boost::system::error_code ec;
	    timer.async_wait(yield[ec]);
    }
  }

  // processing of a specific queue
  void process_queue(const std::string& queue_name, yield_context yield) {
    constexpr auto max_elements = 1024;
    auto is_idle = false;
    const std::string start_marker;

    // start a the cleanup coroutine for the queue
    spawn::spawn(io_context, [this, queue_name](yield_context yield) {
            cleanup_queue(queue_name, yield);
            }, make_stack_allocator());
    
    while (true) {
      // if queue was empty the last time, sleep for idle timeout
      if (is_idle) {
        Timer timer(io_context);
        timer.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us));
        boost::system::error_code ec;
	      timer.async_wait(yield[ec]);
      }

      // get list of entries in the queue
      is_idle = true;
      bool truncated = false;
      std::string end_marker;
      std::vector<cls_queue_entry> entries;
      auto total_entries = 0U;
      {
        librados::ObjectReadOperation op;
        op.assert_exists();
        bufferlist obl;
        int rval;
        rados::cls::lock::assert_locked(&op, queue_name+"_lock", 
          ClsLockType::EXCLUSIVE,
          lock_cookie, 
          "" /*no tag*/);
        cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval);
        // check ownership and list entries in one batch
        auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield));
        if (ret == -ENOENT) {
          // queue was deleted
          ldpp_dout(this, 5) << "INFO: queue: " 
            << queue_name << ". was removed. processing will stop" << dendl;
          return;
        }
        if (ret == -EBUSY) {
          ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
          return;
        }
        if (ret < 0) {
          ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: " 
            << queue_name << ". error: " << ret << " (will retry)" << dendl;
          continue;
        }
        ret = cls_2pc_queue_list_entries_result(obl, entries, &truncated, end_marker);
        if (ret < 0) {
          ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: " 
            << queue_name << ". error: " << ret << " (will retry)" << dendl;
          continue;
        }
      }
      total_entries = entries.size();
      if (total_entries == 0) {
        // nothing in the queue
        continue;
      }
      // log when queue is not idle
      ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name <<
        ". end marker is: " << end_marker << dendl;
      
      is_idle = false;
      auto has_error = false;
      auto remove_entries = false;
      auto entry_idx = 1U;
      tokens_waiter waiter(io_context);
      for (auto& entry : entries) {
        if (has_error) {
          // bail out on first error
          break;
        }
        // TODO pass entry pointer instead of by-value
        spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) {
            const auto token = waiter.make_token();
            if (process_entry(entry, yield)) {
              ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl;
              remove_entries = true;
            }  else {
              if (set_min_marker(end_marker, entry.marker) < 0) {
                ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
              } else {
                ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
              }
              has_error = true;
              ldpp_dout(this, 20) << "INFO: processing of entry: " << 
                entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
            } 
        }, make_stack_allocator());
        ++entry_idx;
      }

      // wait for all pending work to finish
      waiter.async_wait(yield);

      // delete all published entries from queue
      if (remove_entries) {
        librados::ObjectWriteOperation op;
        op.assert_exists();
        rados::cls::lock::assert_locked(&op, queue_name+"_lock", 
          ClsLockType::EXCLUSIVE,
          lock_cookie, 
          "" /*no tag*/);
        cls_2pc_queue_remove_entries(op, end_marker); 
        // check ownership and deleted entries in one batch
        const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); 
        if (ret == -ENOENT) {
          // queue was deleted
          ldpp_dout(this, 5) << "INFO: queue: " 
            << queue_name << ". was removed. processing will stop" << dendl;
          return;
        }
        if (ret == -EBUSY) {
          ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
          return;
        }
        if (ret < 0) {
          ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker <<  " from queue: " 
            << queue_name << ". error: " << ret << dendl;
        } else {
          ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker <<  " from queue: " 
          << queue_name << dendl;
        }
      }
    }
  }

  // lits of owned queues
  using owned_queues_t = std::unordered_set<std::string>;

  // process all queues
  // find which of the queues is owned by this daemon and process it
  void process_queues(yield_context yield) {
    auto has_error = false;
    owned_queues_t owned_queues;

    // add randomness to the duration between queue checking
    // to make sure that different daemons are not synced
    std::random_device seed;
    std::mt19937 rnd_gen(seed());
    const auto min_jitter = 100; // ms
    const auto max_jitter = 500; // ms
    std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);

    std::vector<std::string> queue_gc;
    std::mutex queue_gc_lock;
    while (true) {
      Timer timer(io_context);
      const auto duration = (has_error ? 
        std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) + 
        std::chrono::milliseconds(duration_jitter(rnd_gen));
      timer.expires_from_now(duration);
      const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration);
      ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp)  << dendl;
      boost::system::error_code ec;
      timer.async_wait(yield[ec]);

      queues_t queues;
      auto ret = read_queue_list(queues, optional_yield(io_context, yield));
      if (ret < 0) {
        has_error = true;
        continue;
      }

      for (const auto& queue_name : queues) {
        // try to lock the queue to check if it is owned by this rgw
        // or if ownershif needs to be taken
        librados::ObjectWriteOperation op;
        op.assert_exists();
        rados::cls::lock::lock(&op, queue_name+"_lock", 
              ClsLockType::EXCLUSIVE,
              lock_cookie, 
              "" /*no tag*/,
              "" /*no description*/,
              failover_time,
              LOCK_FLAG_MAY_RENEW);

        ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
        if (ret == -EBUSY) {
          // lock is already taken by another RGW
          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
          // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards
          continue;
        }
        if (ret == -ENOENT) {
          // queue is deleted - processing will stop the next time we try to read from the queue
          ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl;
          continue;
        }
        if (ret < 0) {
          // failed to lock for another reason, continue to process other queues
          ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl;
          has_error = true;
          continue;
        }
        // add queue to list of owned queues
        if (owned_queues.insert(queue_name).second) {
          ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
          // start processing this queue
          spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](yield_context yield) {
            process_queue(queue_name, yield);
            // if queue processing ended, it measn that the queue was removed or not owned anymore
            // mark it for deletion
            std::lock_guard lock_guard(queue_gc_lock);
            queue_gc.push_back(queue_name);
            ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
          }, make_stack_allocator());
        } else {
          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl;
        }
      }
      // erase all queue that were deleted
      {
        std::lock_guard lock_guard(queue_gc_lock);
        std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
          owned_queues.erase(queue_name);
          ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
        });
        queue_gc.clear();
      }
    }
  }

public:

  ~Manager() {
    work_guard.reset();
    io_context.stop();
    std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
  }

  // ctor: start all threads
  Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms, 
          uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, 
          uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
          uint32_t _worker_count, rgw::sal::RadosStore* store) :
    max_queue_size(_max_queue_size),
    queues_update_period_ms(_queues_update_period_ms),
    queues_update_retry_ms(_queues_update_retry_ms),
    queue_idle_sleep_us(_queue_idle_sleep_us),
    failover_time(std::chrono::milliseconds(failover_time_ms)),
    cct(_cct),
    rados_ioctx(store->getRados()->get_notif_pool_ctx()),
    lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)),
    work_guard(boost::asio::make_work_guard(io_context)),
    worker_count(_worker_count),
    stale_reservations_period_s(_stale_reservations_period_s),
    reservations_cleanup_period_s(_reservations_cleanup_period_s)
    {
      spawn::spawn(io_context, [this] (yield_context yield) {
            process_queues(yield);
          }, make_stack_allocator());

      // start the worker threads to do the actual queue processing
      const std::string WORKER_THREAD_NAME = "notif-worker";
      for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
        workers.emplace_back([this]() {
          try {
            io_context.run(); 
          } catch (const std::exception& err) {
            ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
            throw(err);
          }
        });
        const auto rc = ceph_pthread_setname(workers.back().native_handle(), 
          (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
        ceph_assert(rc == 0);
      }
      ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
    }

  int add_persistent_topic(const std::string& topic_name, optional_yield y) {
    if (topic_name == Q_LIST_OBJECT_NAME) {
      ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
      return -EINVAL;
    }
    librados::ObjectWriteOperation op;
    op.create(true);
    cls_2pc_queue_init(op, topic_name, max_queue_size);
    auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
    if (ret == -EEXIST) {
      // queue already exists - nothing to do
      ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
      return 0;
    }
    if (ret < 0) {
      // failed to create queue
      ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
      return ret;
    }
   
    bufferlist empty_bl;
    std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}};
    op.omap_set(new_topic);
    ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
    if (ret < 0) {
      ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
      return ret;
    } 
    ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list"  << dendl;
    return 0;
  }
  
  int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
    librados::ObjectWriteOperation op;
    op.remove();
    auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
    if (ret == -ENOENT) {
      // queue already removed - nothing to do
      ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
      return 0;
    }
    if (ret < 0) {
      // failed to remove queue
      ldpp_dout(this, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
      return ret;
    }
  
    std::set<std::string> topic_to_remove{{topic_name}};
    op.omap_rm_keys(topic_to_remove);
    ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
    if (ret < 0) {
      ldpp_dout(this, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
      return ret;
    } 
    ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " removed from queue list"  << dendl;
    return 0;
  }
};

// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
static Manager* s_manager = nullptr;

constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30;     // check queue list every 30seconds
constexpr uint32_t Q_LIST_RETRY_MSEC = 1000;         // retry every second if queue list update failed
constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000;     // idle sleep 100ms
constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time
constexpr uint32_t WORKER_COUNT = 1;                 // 1 worker thread
constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120;   // cleanup reservations that are more than 2 minutes old
constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds

bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp) {
  if (s_manager) {
    return false;
  }
  // TODO: take conf from CephContext
  s_manager = new Manager(cct, MAX_QUEUE_SIZE, 
      Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, 
      IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, 
      STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
      WORKER_COUNT,
      store);
  return true;
}

void shutdown() {
  delete s_manager;
  s_manager = nullptr;
}

int add_persistent_topic(const std::string& topic_name, optional_yield y) {
  if (!s_manager) {
    return -EAGAIN;
  }
  return s_manager->add_persistent_topic(topic_name, y);
}

int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
  if (!s_manager) {
    return -EAGAIN;
  }
  return s_manager->remove_persistent_topic(topic_name, y);
}

rgw::sal::Object* get_object_with_atttributes(
  const reservation_t& res, rgw::sal::Object* obj) {
  // in case of copy obj, the tags and metadata are taken from source
  const auto src_obj = res.src_object ? res.src_object : obj;
  if (src_obj->get_attrs().empty()) {
    if (!src_obj->get_bucket()) {
      src_obj->set_bucket(res.bucket);
    }
    if (src_obj->get_obj_attrs(res.yield, res.dpp) < 0) {
      return nullptr;
    }
  }
  return src_obj;
}

static inline void metadata_from_attributes(
  reservation_t& res, rgw::sal::Object* obj) {
  auto& metadata = res.x_meta_map;
  const auto src_obj = get_object_with_atttributes(res, obj);
  if (!src_obj) {
    return;
  }
  for (auto& attr : src_obj->get_attrs()) {
    if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
      std::string_view key(attr.first);
      key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
      // we want to pass a null terminated version
      // of the bufferlist, hence "to_str().c_str()"
      metadata.emplace(key, attr.second.to_str().c_str());
    }
  }
}

static inline void tags_from_attributes(
  const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
  const auto src_obj = get_object_with_atttributes(res, obj);
  if (!src_obj) {
    return;
  }
  const auto& attrs = src_obj->get_attrs();
  const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
  if (attr_iter != attrs.end()) {
    auto bliter = attr_iter->second.cbegin();
    RGWObjTags obj_tags;
    try {
      ::decode(obj_tags, bliter);
    } catch(buffer::error&) {
      // not able to decode tags
      return;
    }
    tags = std::move(obj_tags.get_tags());
  }
}

// populate event from request
static inline void populate_event(reservation_t& res,
        rgw::sal::Object* obj,
        uint64_t size,
        const ceph::real_time& mtime, 
        const std::string& etag, 
        const std::string& version, 
        EventType event_type,
        rgw_pubsub_s3_event& event) {
  event.eventTime = mtime;
  event.eventName = to_event_string(event_type);
  event.userIdentity = res.user_id;    // user that triggered the change
  event.x_amz_request_id = res.req_id; // request ID of the original change
  event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
  // configurationId is filled from notification configuration
  event.bucket_name = res.bucket->get_name();
  event.bucket_ownerIdentity = res.bucket->get_owner() ? res.bucket->get_owner()->get_id().id : "";
  const auto region = res.store->get_zone()->get_zonegroup().get_api_name();
  rgw::ARN bucket_arn(res.bucket->get_key());
  bucket_arn.region = region; 
  event.bucket_arn = to_string(bucket_arn);
  event.object_key = res.object_name ? *res.object_name : obj->get_name();
  event.object_size = size;
  event.object_etag = etag;
  event.object_versionId = version;
  event.awsRegion = region;
  // use timestamp as per key sequence id (hex encoded)
  const utime_t ts(real_clock::now());
  boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
          std::back_inserter(event.object_sequencer));
  set_event_id(event.id, etag, ts);
  event.bucket_id = res.bucket->get_bucket_id();
  // pass meta data
  if (res.x_meta_map.empty()) {
    // no metadata cached:
    // either no metadata exist or no metadata filter was used
    metadata_from_attributes(res, obj);
  } else {
      event.x_meta_map = res.x_meta_map;
  }
  // pass tags
  if (!res.tagset ||
      (*res.tagset).get_tags().empty()) {
    // try to fetch the tags from the attributes
    tags_from_attributes(res, obj, event.tags);
  } else {
    event.tags = (*res.tagset).get_tags();
  }
  // opaque data will be filled from topic configuration
}

static inline bool notification_match(reservation_t& res,
				      const rgw_pubsub_topic_filter& filter,
				      EventType event,
				      const RGWObjTags* req_tags) {
  if (!match(filter.events, event)) { 
    return false;
  }
  const auto obj = res.object;
  if (!match(filter.s3_filter.key_filter, 
        res.object_name ? *res.object_name : obj->get_name())) {
    return false;
  }

  if (!filter.s3_filter.metadata_filter.kv.empty()) {
    // metadata filter exists
    if (res.s) {
      res.x_meta_map = res.s->info.x_meta_map;
    }
    metadata_from_attributes(res, obj);
    if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) {
      return false;
    }
  }

  if (!filter.s3_filter.tag_filter.kv.empty()) {
    // tag filter exists
    if (req_tags) {
      // tags in the request
      if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
        return false;
      }
    } else if (res.tagset && !(*res.tagset).get_tags().empty()) {
      // tags were cached in req_state
      if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) {
        return false;
      }
    } else {
      // try to fetch tags from the attributes
      KeyMultiValueMap tags;
      tags_from_attributes(res, obj, tags);
      if (!match(filter.s3_filter.tag_filter, tags)) {
        return false;
      }
    }
  }

  return true;
}

  int publish_reserve(const DoutPrefixProvider* dpp,
		      EventType event_type,
		      reservation_t& res,
		      const RGWObjTags* req_tags)
{
  RGWPubSub ps(res.store, res.user_tenant);
  RGWPubSub::Bucket ps_bucket(&ps, res.bucket->get_key());
  rgw_pubsub_bucket_topics bucket_topics;
  auto rc = ps_bucket.get_topics(&bucket_topics);
  if (rc < 0) {
    // failed to fetch bucket topics
    return rc;
  }
  for (const auto& bucket_topic : bucket_topics.topics) {
    const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
    const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
    if (!notification_match(res, topic_filter, event_type, req_tags)) {
      // notification does not apply to req_state
      continue;
    }
    ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
        "' on topic: '" << topic_cfg.dest.arn_topic << 
        "' and bucket: '" << res.bucket->get_name() <<
        "' (unique topic: '" << topic_cfg.name <<
        "') apply to event of type: '" << to_string(event_type) << "'" << dendl;

    cls_2pc_reservation::id_t res_id;
    if (topic_cfg.dest.persistent) {
      // TODO: take default reservation size from conf
      constexpr auto DEFAULT_RESERVATION = 4*1024U; // 4K
      res.size = DEFAULT_RESERVATION;
      librados::ObjectWriteOperation op;
      bufferlist obl;
      int rval;
      const auto& queue_name = topic_cfg.dest.arn_topic;
      cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
      auto ret = rgw_rados_operate(
	res.dpp, res.store->getRados()->get_notif_pool_ctx(),
	queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
      if (ret < 0) {
        ldpp_dout(res.dpp, 1) <<
	  "ERROR: failed to reserve notification on queue: "
			      << queue_name << ". error: " << ret << dendl;
        // if no space is left in queue we ask client to slow down
        return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
      }
      ret = cls_2pc_queue_reserve_result(obl, res_id);
      if (ret < 0) {
        ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
        return ret;
      }
    }
    res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
  }
  return 0;
}

int publish_commit(rgw::sal::Object* obj,
		   uint64_t size,
		   const ceph::real_time& mtime,
		   const std::string& etag,
		   const std::string& version,
		   EventType event_type,
		   reservation_t& res,
		   const DoutPrefixProvider* dpp)
{
  for (auto& topic : res.topics) {
    if (topic.cfg.dest.persistent &&
	topic.res_id == cls_2pc_reservation::NO_ID) {
      // nothing to commit or already committed/aborted
      continue;
    }
    event_entry_t event_entry;
    populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event);
    event_entry.event.configurationId = topic.configurationId;
    event_entry.event.opaque_data = topic.cfg.opaque_data;
    if (topic.cfg.dest.persistent) { 
      event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
      event_entry.push_endpoint_args =
	std::move(topic.cfg.dest.push_endpoint_args);
      event_entry.arn_topic = topic.cfg.dest.arn_topic;
      bufferlist bl;
      encode(event_entry, bl);
      const auto& queue_name = topic.cfg.dest.arn_topic;
      if (bl.length() > res.size) {
        // try to make a larger reservation, fail only if this is not possible
        ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
			  << " exceeded reserved size: " << res.size
			  <<
          " . trying to make a larger reservation on queue:" << queue_name
			  << dendl;
        // first cancel the existing reservation
        librados::ObjectWriteOperation op;
        cls_2pc_queue_abort(op, topic.res_id);
        auto ret = rgw_rados_operate(
	  dpp, res.store->getRados()->get_notif_pool_ctx(),
	  topic.cfg.dest.arn_topic, &op,
	  res.yield);
        if (ret < 0) {
          ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
			    << topic.res_id << 
            " when trying to make a larger reservation on queue: " << queue_name
			    << ". error: " << ret << dendl;
          return ret;
        }
        // now try to make a bigger one
	buffer::list obl;
        int rval;
        cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
        ret = rgw_rados_operate(
	  dpp, res.store->getRados()->get_notif_pool_ctx(),
          queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
        if (ret < 0) {
          ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
			    << queue_name
			    << ". error: " << ret << dendl;
          return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
        }
        ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
        if (ret < 0) {
          ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for "
	    "extra space. error: " << ret << dendl;
          return ret;
        }
      }
      std::vector<buffer::list> bl_data_vec{std::move(bl)};
      librados::ObjectWriteOperation op;
      cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
      const auto ret = rgw_rados_operate(
	dpp, res.store->getRados()->get_notif_pool_ctx(),
	queue_name, &op, res.yield);
      topic.res_id = cls_2pc_reservation::NO_ID;
      if (ret < 0) {
        ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
			  << queue_name << ". error: " << ret
			  << dendl;
        return ret;
      }
    } else {
      try {
        // TODO add endpoint LRU cache
        const auto push_endpoint = RGWPubSubEndpoint::create(
	  topic.cfg.dest.push_endpoint,
	  topic.cfg.dest.arn_topic,
	  RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
	  dpp->get_cct());
        ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
			       << topic.cfg.dest.push_endpoint << dendl;
        const auto ret = push_endpoint->send_to_completion_async(
	  dpp->get_cct(), event_entry.event, res.yield);
        if (ret < 0) {
          ldpp_dout(dpp, 1) << "ERROR: push to endpoint "
			    << topic.cfg.dest.push_endpoint
			    << " failed. error: " << ret << dendl;
          if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
          return ret;
        }
        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
      } catch (const RGWPubSubEndpoint::configuration_error& e) {
        ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: " 
            << topic.cfg.dest.push_endpoint << ". error: " << e.what() << dendl;
        if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
        return -EINVAL;
      }
    }
  }
  return 0;
}

int publish_abort(reservation_t& res) {
  for (auto& topic : res.topics) {
    if (!topic.cfg.dest.persistent ||
	topic.res_id == cls_2pc_reservation::NO_ID) {
      // nothing to abort or already committed/aborted
      continue;
    }
    const auto& queue_name = topic.cfg.dest.arn_topic;
    librados::ObjectWriteOperation op;
    cls_2pc_queue_abort(op, topic.res_id);
    const auto ret = rgw_rados_operate(
      res.dpp, res.store->getRados()->get_notif_pool_ctx(),
      queue_name, &op, res.yield);
    if (ret < 0) {
      ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
			    << topic.res_id <<
        " from queue: " << queue_name << ". error: " << ret << dendl;
      return ret;
    }
    topic.res_id = cls_2pc_reservation::NO_ID;
  }
  return 0;
}

reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
			     rgw::sal::RadosStore* _store,
			     const req_state* _s,
			     rgw::sal::Object* _object,
			     rgw::sal::Object* _src_object,
			     const std::string* _object_name) :
  dpp(_s), store(_store), s(_s), size(0) /* XXX */,
  object(_object), src_object(_src_object), bucket(_s->bucket.get()),
  object_name(_object_name),
  tagset(_s->tagset),
  x_meta_map(_s->info.x_meta_map),
  user_id(_s->user->get_id().id),
  user_tenant(_s->user->get_id().tenant),
  req_id(_s->req_id),
  yield(_s->yield)
{}

reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
			     rgw::sal::RadosStore* _store,
			     rgw::sal::Object* _object,
			     rgw::sal::Object* _src_object,
			     rgw::sal::Bucket* _bucket,
			     const std::string& _user_id,
			     const std::string& _user_tenant,
			     const std::string& _req_id,
			     optional_yield y) :
    dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */,
    object(_object), src_object(_src_object), bucket(_bucket),
    object_name(nullptr),
    user_id(_user_id),
    user_tenant(_user_tenant),
    req_id(_req_id),
    yield(y)
{}

reservation_t::~reservation_t() {
  publish_abort(*this);
}

} // namespace rgw::notify
